package me.seawenc.datastash.outputs.tcp;

import me.seawenc.datastash.outputs.AbstractConsumer;

import java.io.OutputStream;
import java.net.Socket;
import java.util.Properties;

/**
 * 可通过以下指令 开启测试端口
 * nc -l -k -p 8888
 */
public class TcpConsumer extends AbstractConsumer {
    private String host;
    private int port;
    public TcpConsumer(Properties props) {
        super(props);
        host = props.getProperty("output.tcp.host");
        String p = props.getProperty("output.tcp.port");
        System.out.println("TcpConsumer host="+host+",port="+p);
        port = Integer.parseInt(p);
    }

    @Override
    public void consumption(String msg) throws Exception {
        Socket ske = new Socket(host,port);
        //创建一个写入流
        OutputStream out = ske.getOutputStream();
        //调用写入流写数据方法
        out.write(msg.getBytes("UTF-8"));
        out.flush();
        //关闭程序
        ske.close();
    }

}
